buffer responses to compute programs in a TQueue
authorJoey Hess <joeyh@joeyh.name>
Tue, 11 Mar 2025 16:40:21 +0000 (12:40 -0400)
committerJoey Hess <joeyh@joeyh.name>
Tue, 11 Mar 2025 16:40:21 +0000 (12:40 -0400)
This avoids a potential problem where the program sends several INPUT
before reading responses, so flushing the respose to the pipe could
block. It's unlikely, but seemed worth making sure it can't happen.

Remote/Compute.hs
doc/todo/compute_special_remote_remaining_todos.mdwn

index 0b27d135ba4f548948a6bb16c4906d91e9189980..2ef7844808906ec5d6a70f6cd60d2f58c85c683c 100644 (file)
@@ -435,10 +435,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                showOutput
                starttime <- liftIO currentMonotonicTimestamp
                let startresult = ComputeProgramResult state False False False
-               result <- withmeterfile $ \meterfile -> bracket
-                       (liftIO $ createProcess pr)
-                       (liftIO . cleanupProcess)
-                       (getinput tmpdir subdir startresult meterfile)
+               result <- withmeterfile $ \meterfile -> 
+                       bracket
+                               (liftIO $ createProcess pr)
+                               (liftIO . cleanupProcess) $ \p -> 
+                                       withoutputv p $
+                                               getinput tmpdir subdir startresult meterfile p
                endtime <- liftIO currentMonotonicTimestamp
                liftIO $ checkoutputs result subdir
                cont result subdir (calcduration starttime endtime)
@@ -453,14 +455,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                        , return tmpdir
                        )
        
-       getinput tmpdir subdir result meterfile p = 
+       getinput tmpdir subdir result meterfile p outputv 
                liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
                        Just l
-                               | null l -> getinput tmpdir subdir result meterfile p
+                               | null l -> getinput tmpdir subdir result meterfile p outputv
                                | otherwise -> do
                                        fastDebug "Compute" ("< " ++ l)
-                                       result' <- parseoutput p tmpdir subdir result meterfile l
-                                       getinput tmpdir subdir result' meterfile p
+                                       result' <- parseoutput outputv tmpdir subdir result meterfile l
+                                       getinput tmpdir subdir result' meterfile p outputv
                        Nothing -> do
                                liftIO $ hClose (stdoutHandle p)
                                liftIO $ hClose (stdinHandle p)
@@ -468,19 +470,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                                        giveup $ program ++ " exited unsuccessfully"
                                return result
        
-       sendresponse p s = do
-               fastDebug "Compute" ("> " ++ s)
-               liftIO $ hPutStrLn (stdinHandle p) s
-               liftIO $ hFlush (stdinHandle p)
-
-       parseoutput p tmpdir subdir result meterfile l = case Proto.parseMessage l of
-               Just (ProcessInput f) -> handleinput f False p tmpdir subdir result
-               Just (ProcessInputRequired f) -> handleinput f True p tmpdir subdir result
+       parseoutput outputv tmpdir subdir result meterfile l = case Proto.parseMessage l of
+               Just (ProcessInput f) -> handleinput f False outputv tmpdir subdir result
+               Just (ProcessInputRequired f) -> handleinput f True outputv tmpdir subdir result
                Just (ProcessOutput f) -> do
                        let f' = toOsPath f
                        checksafefile tmpdir subdir f' "output"
                        -- Modify filename so eg "-foo" becomes "./-foo"
-                       sendresponse p $ toCommand' (File f)
+                       sendresponse outputv $ toCommand' (File f)
                        -- If the output file is in a subdirectory, make
                        -- the directories so the compute program doesn't
                        -- need to.
@@ -508,7 +505,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                Just ProcessSandbox -> do
                        sandboxpath <- liftIO $ fromOsPath <$>
                                relPathDirToFile subdir tmpdir
-                       sendresponse p $
+                       sendresponse outputv $
                                if null sandboxpath
                                        then "."
                                        else sandboxpath
@@ -516,7 +513,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                Nothing -> giveup $
                        program ++ " output an unparseable line: \"" ++ l ++ "\""
 
-       handleinput f required p tmpdir subdir result = do
+       handleinput f required outputv tmpdir subdir result = do
                let f' = toOsPath f
                let knowninput = M.member f'
                        (computeInputs (computeState result))
@@ -534,7 +531,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                                                mkrel $ pure obj
                                Just (Left gitsha) -> 
                                        mkrel $ populategitsha gitsha tmpdir
-                       sendresponse p $
+                       sendresponse outputv $
                                maybe "" fromOsPath mp
                        let result' = result
                                { computeInputsUnavailable = 
@@ -630,6 +627,28 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
                        Just sz ->
                                progress $ BytesProcessed $ floor $ 
                                        fromIntegral sz * percent / 100
+       
+       withoutputv p a = do
+               outputv <- liftIO $ atomically newTQueue
+               let cleanup pid = do
+                       atomically $ writeTQueue outputv Nothing
+                       wait pid
+               bracket 
+                       (liftIO $ async $ sendoutput' p outputv)
+                       (liftIO . cleanup)
+                       (const $ a outputv)
+
+       sendoutput' p outputv =
+               atomically (readTQueue outputv) >>= \case
+                       Nothing -> return ()
+                       Just s -> do
+                               liftIO $ hPutStrLn (stdinHandle p) s
+                               liftIO $ hFlush (stdinHandle p)
+                               sendoutput' p outputv
+
+       sendresponse outputv s = do
+               fastDebug "Compute" ("> " ++ s)
+               liftIO $ atomically $ writeTQueue outputv (Just s)
 
 computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
 computationBehaviorChangeError (ComputeProgram program) requestdesc p =
index bba17b23006d39d0c6aa596e2b1fbbaa0ec59f76..f478c5d9668e60d7ee45a1ea09a5ce6bcbef447f 100644 (file)
@@ -1,11 +1,6 @@
 This is the remainder of my todo list while I was building the
 compute special remote. --[[Joey]]
 
-* git-annex responds to each INPUT immediately, and flushes stdout.
-  This could cause problems if the program is sending several INPUT
-  first, before reading responses, as is documented it should do to allow
-  for parallel get of the input files.
-
 * write a tip showing how to use this
 
 * Support parallel get of input files. The design allows for this,